package com.danan.data_collector.app;

import com.danan.data_collector.util.ConfigUtil;
import com.danan.data_collector.util.DataStreamUtil;
import com.danan.data_collector.util.SinkUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/05/28/9:57
 * @Description:
 */
public class DataCollector {

    public static void main(String[] args) throws Exception {

        if (args.length != 2) {
            throw new RuntimeException("Please input correct values: [schema , port]");
        }

        String schema = args[0];
        String port = args[1];

        StreamExecutionEnvironment env = environmentInitializer(schema,port);
        DataStreamSource<String> ds = getDataStream(env,schema);
        ds.sinkTo(SinkUtil.getKafkaSink());
        env.execute();
    }



    private static DataStreamSource<String> getDataStream(StreamExecutionEnvironment env,String schema) {
        switch (ConfigUtil.getProperty("data.source.type").toLowerCase()) {
            case "mysql":
                return DataStreamUtil.getMySQLDataStream(env,schema);
            case "oracle":
                return DataStreamUtil.getOracleDataStream(env,schema);
            case "sqlserver":
                return DataStreamUtil.getSqlServerDataStream(env,schema);
            default:
                throw new RuntimeException("The data source is not defined. Please select a data source from the list [ mysql , oracle , sqlserver ]");
        }
    }

    private static StreamExecutionEnvironment environmentInitializer(String schema,String port) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", Integer.parseInt(port));//设置web端口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(Integer.parseInt(ConfigUtil.getProperty("environment.parallelism")));
        // 2 设置状态后端和Checkpoint
//        env.setStateBackend(new EmbeddedRocksDBStateBackend());//启用RocksDB状态后端
        env.enableCheckpointing(60 * 1000);//设置checkpoint的周期
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//设置同时checkpoint的数量
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);//checkpoint的超时时间
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointStorage("file:///opt/module/ods-1.0.0/ck/collector/emr/" + schema);//设置checkpoint的存储路径
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3600);
        // 故障重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 30));

        return env;
    }
}
